上一篇:Day 10 - 資料的舞太 Elixir Broadway (上)說到工程師們運用了 Gen Stage 的技術,做出了 Broadway ,但他跟 Gen Stage 到底有什麼不同呢?
最主要的分別:
那設定上會不會很困難呢?其實非常的簡單,你只要寫好一個設定的 function,然後指定好誰當 Producer,誰當Consumer ,Pipeline 就可以啟動了!
先讓我們寫好設定檔案:
defmodule BroadwayDemo do
  use Broadway
  def start_link(_opts) do
    Broadway.start_link(BroadwayDemo,
      name: BroadwayDemo,
      producer: [
        module: {Counter, []},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 2]
      ]
    )
  end
  ...callbacks...
end
上面的 producer的部分就是產出資料的地方,指定好是哪一個 Module 負責,並且寫上要開幾個 process(上面
concurrency:1 表示開一個 process),這樣就設定好Producer了;另外Processors的部分可以視為Consumer,這邊可以看到他指定要啟動兩個 process。
如果你想要讓資料可以批次處理,那在設定檔上面再加上 batcher的部分:
 defmodule BroadwayDemo do
  use Broadway
  def start_link(_opts) do
    Broadway.start_link(BroadwayDemo,
      name: BroadwayDemo,
      producer: [
        module: {Counter, []},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 2]
      ],
       batchers: [
        sqs: [concurrency: 2, batch_size: 10],
        s3: [concurrency: 1, batch_size: 10]
      ]
    )
  end
  ...callbacks...
end
在 batcher 上面我們可以看到我們有兩個 batcher,一個是sqs,另一個是s3,而每個批次以十個為一單位。
在設定寫好之後,就要來看資料進來之後該怎麼做啦!如果你只有processor的話,只需要在 module 裡面加入
handle_message/3,這樣一來 processor 就知道該怎麼處理進來的資料,例如:
def handle_message(_, %Message{data: data} = message, _) do
    IO.inspect(message)
    message
end
如果有 batcher的話,就需要加入 handle_batch/4,並且在 handle_message裡面跟elixir講說哪一筆資料要去哪一個 batcher:
  def handle_message(_, %Message{data: data} = message, _) when is_odd(data) do
    message
    |> Message.update_data(&process_data/1) #在 handle_message中也可以在對資料進行處理
    |> Message.put_batcher(:sqs)
  end
  def handle_message(_, %Message{data: data} = message, _) when is_even(data) do
    message
    |> Message.update_data(&process_data/1)
    |> Message.put_batcher(:s3)
  end
  defp process_data(data) do
    # 處理資料!
  end
  @impl true
  def handle_batch(:sqs, messages, _batch_info, _context) do
    # 將資料送到 SQS,進行後續的處理
  end
  def handle_batch(:s3, messages, _batch_info, _context) do
    # 將資料存入S3
  end
在我的公司,我做的專案裡面,我跟我同事運用了 Broadway,每五分鐘引入每個加密幣的資訊,存入S3,將最新的一筆資訊加入各個不同時間區間的價格走勢圖;也在這段時間內重新計算各個加密幣的排名(根據市值)
另外為什麼這個套件要叫做 Broadway呢?因為這個套件裡面有很多 Producer (監製),很多 stage (舞台)(原本是開多少個 process的key,後來避免混淆改成 concurrency,個人覺得比較可惜XD),所以工程師把它命名成 Broadway XD